feat(core): support sort manifest entries by partition#7866
Conversation
Aitozi
left a comment
There was a problem hiding this comment.
LGTM +1, Please resolve the conflict
|
CC @JingsongLi for another look |
|
Hi @baiyangtx @Aitozi , please take a look to #7842 , I didn't look closely, maybe the abilities of these two PR implementations are the same. |
Thanks @discivigour for working on this too. My PR #7866 takes a different approach — sorting inline during the existing full compaction path, rather than a separate post-commit sort pipeline. Key differences:
Happy to align with the community's direction, but I believe the inline approach is simpler and less invasive. |
- Sort delta entries during commit - Sort base entries via full merge See merge request: !854
The manifest entry sorting feature introduced in the parent commit used FileKind.DELETE in mergeUnsorted() and mergeSortedByPartition() without importing org.apache.paimon.manifest.FileKind, causing all modules that depend on paimon-core to fail compilation.
The sorting refactor eliminated the single call to sequentialBatchedExecute, but left the static import, causing Checkstyle to fail the build.
- FileStoreCommitImpl: use options.xxx() instead of local variables (manifestTargetSize, manifestMergeMinCount, etc. don't exist) - FileStoreCommitImpl: replace coreOptions with options - ManifestFileMerger: replace entry.toBytes() with entrySerializer.serializeToBytes() (toBytes() doesn't exist on apache/master) - ManifestFileMerger: extract createPartitionRecordComparator() to handle partition row comparison (InternalRowUtils.compare takes DataTypeRoot, not RowType; newRecordComparator takes boolean[], not boolean) - Remove stale reader Function referencing deleted readForFullCompaction - Add missing DataType import, remove unused Function/singletonList imports
createPartitionRecordComparator now returns RecordComparator instead of Comparator<BinaryRow>, since RecordComparator extends Comparator<InternalRow> and newRecordComparator already returns RecordComparator. This avoids a type incompatibility at the assignment site where the result was used as RecordComparator.
Manifest sorting changes the read order of results across partitions. The test compared results using containsExactly which requires exact order; changed to containsExactlyInAnyOrder to match the same approach used in CompactProcedureTestBase.scala.
Manifest sorting changes read order; the test expected sorted output. Added explicit sort by id to make the test order-independent.
… assertion - testFullCompactionReadManifestsInParallel: removed parallel read assertions since reads are now sequential in mergeSortedByPartition - BlobTableTest/MultipleBlobTableTest: disable MANIFEST_MERGE_SORTED and MANIFEST_DELTA_SORTED since blob/vector-store code depends on original file ordering
…rtions The manifest sorting feature changes the order of results. All containsExactlyElementsOf in IncrementalClusterActionITCase need to be changed to containsExactlyInAnyOrderElementsOf, not just the one in the assertResult helper.
…in Spark tests CompactProcedureTestBase and RescaleProcedureTest had additional containsExactlyElementsOf calls that were not updated in the original PR. All changed to containsExactlyInAnyOrderElementsOf.
- LanceFormatTest: add ORDER BY a to LIMIT queries - SparkWriteITCase.testWriteWithDefaultValue: add ORDER BY a to all SELECT queries whose results are compared via toString()
| private static int mergeSortedByPartition( | ||
| List<ManifestFileMeta> toBeMerged, |
There was a problem hiding this comment.
Thanks @baiyangtx for working on this too. Can this code sort the out-of-order large file manifest that already exists in the table? If the user wants to enable this parameter halfway, it seems that the feature won't work.
There was a problem hiding this comment.
@discivigour Good point — my approach indeed doesn't handle existing out-of-order manifests. I think our two PRs are actually complementary: yours handles the bootstrap/rewrite path for historical manifests, mine maintains sorted order going forward via inline compaction. I'd like to collaborate and align the configuration surface first so users see one coherent "manifest sort" feature rather than two separate mechanisms. WDYT?
JingsongLi
left a comment
There was a problem hiding this comment.
Review: feat(core): support sort manifest entries by partition
Good motivation -- clustering manifest entries by partition will improve partition-pruning scan performance. Below are several issues I found while reviewing.
1. Loss of ADD filter during manifest reads (performance regression)
In the original readForFullCompaction, the code called:
manifestFile.read(file.fileName(), file.fileSize(), FileEntry.addFilter(), Filter.alwaysTrue())Both mergeUnsorted() and mergeSortedByPartition() now call:
manifestFile.read(file.fileName(), file.fileSize())...and then filter FileKind.DELETE in Java. This means DELETE entries are now deserialized from disk only to be discarded. For manifest files with many DELETE entries (e.g., after bulk deletes or partition drops), this is a regression. Please push the add-filter down into the reader call.
2. Parallel manifest reading was silently removed
The original code used sequentialBatchedExecute(reader, toBeMerged, manifestReadParallelism) to read manifest files in parallel. The refactored mergeUnsorted() and mergeSortedByPartition() now iterate sequentially over toBeMerged. For large tables with hundreds of manifest files needing compaction, this could significantly increase compaction latency. The manifestReadParallelism parameter is now effectively unused in the full compaction path.
The test testFullCompactionReadManifestsInParallel was modified to remove the parallelism assertions (maxConcurrentManifestReads), which confirms the feature was dropped rather than preserved.
3. IOManager created from System.getProperty("java.io.tmpdir")
In mergeSortedByPartition():
ioManager = IOManager.create(System.getProperty("java.io.tmpdir"));This bypasses any user-configured temp directory or spill path. The commit code already has access to an IOManager (or its configuration) through the FileStore. Please pass the IOManager (or temp directories) from the caller rather than hard-coding the system temp directory. This also avoids creating a new IOManager per compaction.
4. Test assertions weakened to containsExactlyInAnyOrderElementsOf
Many tests (especially clustering/sort-related ones like testLocalSortClusterUnpartitionedTable) were changed from containsExactlyElementsOf to containsExactlyInAnyOrderElementsOf. This fundamentally weakens the test -- e.g., the comment in IncrementalClusterActionITCase says "verify internal order: within the single output file, rows must be sorted ascending by (a, b)" but then uses an assertion that does not verify ordering.
If the manifest sorting changes the read order for these tests, the correct fix is to add an ORDER BY clause to the query (as done in SparkWriteITCase) rather than dropping the ordering guarantee from the assertion. Otherwise these tests no longer validate that clustering produces correctly sorted data.
5. Inconsistent sort key between delta sort and full compaction sort
createManifestEntryComparator (used for delta sorting) orders by (partition, bucket, level, fileName), while mergeSortedByPartition (used for full compaction) orders by (partition, bucket, level). Without a stable tiebreaker in the full compaction sort, entries with the same (partition, bucket, level) have non-deterministic order. Consider adding fileName to the full compaction comparator as well for consistency and deterministic output.
6. Default values: manifest.delta.sorted=true changes write-path behavior silently
manifest.delta.sorted defaults to true, meaning every commit will now incur sorting overhead on the delta file list. Given that manifest.merge.sort-on-commit defaults to false explicitly to avoid commit-path latency impact, it seems inconsistent that delta sorting is on by default. Consider defaulting to false or at least calling this out in migration/upgrade notes since it changes existing behavior.
Minor
- The variable name
partitionRmprincreateManifestEntrySortBufferseems like a typo. ConsiderpartitionCmprorpartitionComparator. - The overload chain for
merge()andtryFullCompaction()(3 overloads each) is getting long. A parameter object or builder might improve readability in the future.
Overall the feature is valuable and the use of BinaryExternalSortBuffer for spill-to-disk is a sound approach. Addressing the performance regressions (filter pushdown, parallel reads) and the weakened test assertions would strengthen this PR significantly.
1. Restore ADD filter in manifest reads (mergeUnsorted and mergeSortedByPartition) to avoid deserializing DELETE entries 2. Add fileName tiebreaker to full compaction sort key for deterministic output 3. Change manifest.delta.sorted default to false (consistent with sort-on-commit default) 4. Rename partitionRmpr to partitionCmp
Thread @nullable IOManager from caller through tryFullCompaction and merge methods to mergeSortedByPartition. Falls back to java.io.tmpdir when null. Overloads and existing callers pass null to preserve existing behavior.
…UnpartitionedTable Restore containsExactlyElementsOf for the 'verify internal order' assertion. With parallelism=1 and single output file, rows within the file are guaranteed to be sorted by (a, b) per local-sort.
Missed in previous commit - the merge method also needs ioManager parameter threaded through to tryFullCompaction.
|
Thanks @JingsongLi for the thorough review. I've addressed most of the feedback:
For discussion — parallel manifest reads (2): The original |
Purpose
Manifest entries are currently written in arrival order, which scatters entries belonging
to the same partition across multiple manifest files. This leads to:
to find all entries for a single partition.
This PR introduces manifest entry sorting by partition, so that entries for the same
partition are clustered together within manifest files.
Changes
New configuration options:
manifest.merge.sortedtruemanifest.merge.sort-on-commitfalsemanifest.delta.sortedtrueCore implementation:
ManifestFileMerger: IntroducesmergeSortedByPartition()andmergeUnsorted().When
manifest.merge.sortedis enabled, entries are collected into aBinaryExternalSortBuffer(with spill-to-disk support), then written inpartition-major order:
(partition, bucket, level).ManifestFileMerger.createManifestEntryComparator(): Comparator used forsorting delta manifests, falling back to pure-Java comparison when codegen
is unavailable.
FileStoreCommitImpl: Wires sort parameters into all three paths —commit manifest merge, delta file writing, and manifest compaction.
Tests